package arp.message.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import arp.process.publish.Message;
import arp.process.publish.ProcessMessageReceiver;

public class KafkaMessageReceiver implements ProcessMessageReceiver {
	private String servers;

	private KafkaConsumer<String, Object> consumer;

	private int lastReceiveMessageCount;

	private KafkaMessageDeserializationStrategy<Object> deserializationStrategy;

	public KafkaMessageReceiver(String servers, String consumerGroup) {
		this(servers, consumerGroup, new FSTDeserializationStrategy());
	}

	public KafkaMessageReceiver(String servers, String consumerGroup,
			KafkaMessageDeserializationStrategy<?> deserializationStrategy) {
		this.servers = servers;
		this.deserializationStrategy = (KafkaMessageDeserializationStrategy<Object>) deserializationStrategy;
		this.consumer = createConsumer(consumerGroup);
	}

	@Override
	public synchronized List<Message> receive() throws Exception {
		if (lastReceiveMessageCount == 0) {
			LockSupport.parkNanos(1L);
		}
		List<Message> messageList = new ArrayList<>();
		ConsumerRecords<String, Object> records = consumer.poll(Duration
				.ofMillis(0));
		for (ConsumerRecord<String, Object> record : records) {
			deserializationStrategy.deserialize(record);
			messageList.add(deserializationStrategy.deserialize(record));
		}
		lastReceiveMessageCount = messageList.size();
		return messageList;
	}

	private KafkaConsumer<String, Object> createConsumer(String group) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
				StringDeserializer.class);
		deserializationStrategy.configValueDeserializerClass(props);
		KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
		List<String> topics = new ArrayList<>();
		topics.add("arp_process_message");
		consumer.subscribe(topics);
		return consumer;
	}

}
